/*
 * FileName: ClickHouseSink.java
 * Author:   zzw
 * Date:     2019年01月28日
 * Description:
 */
package com.chezhibao.flink.sink;

import com.chezhibao.flink.job.RpcJob;
import com.chezhibao.flink.vo.RpcSqlVo;
import com.virtusai.clickhouseclient.ClickHouseClient;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;

/**
 * 〈〉<br>
 * 〈〉
 *
 * @author zzw
 * @see [相关类/方法]（可选）
 * @since [产品/模块版本]（可选）
 */
public class ClickHouseSink implements SinkFunction<RpcSqlVo> {

    private static Logger logger = LoggerFactory.getLogger(RpcJob.class);

    private static ClickHouseClient client = new ClickHouseClient("http://172.16.12.129:8123", "default", "123456");

    @Override
    public void invoke(RpcSqlVo value) throws Exception {
        try {
            if(value.getRowsRpc().size() > 0){
                client.post("INSERT INTO default.clotho_trace", value.getRowsRpc());
            }
        } catch (Exception e) {
            logger.error("click hosue insert error:",e);
            for (Object[] objects: (List<Object[]>) value){
                StringBuilder stringBuilder = null;
                for (Object o:objects){
                    stringBuilder = new StringBuilder();
                    stringBuilder.append(o);
                }
                logger.info(stringBuilder.toString());
            }
        }

        //client.close();
    }

    @Override
    public void invoke(RpcSqlVo value, Context context) throws Exception {

    }
}
